package cn.tiakon.dmp.report

import cn.tiakon.dmp.untils.{ContextUtils, Utils}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}

/**
  * 读取 parquet 文件，将文件以json格式写入到磁盘分区中
  *
  * @author Tiakon
  *         2018/3/29 7:39
  */
object Sql2Core {

  def main(args: Array[String]): Unit = {

    val load: Config = ConfigFactory.load()

    val inputJsonDataPath = load.getString("output.parquet.path")

    val outJsonDataPath = load.getString("output.json.path")

    val sc = ContextUtils.getSparkContext()

    val sqlContext: SQLContext = new SQLContext(sc)

    val dataFrame: DataFrame = sqlContext.read.parquet(inputJsonDataPath)

    val wordAndOne: RDD[((String, String), Int)] = dataFrame.map { df =>
      ((df.getAs("provincename").toString, df.getAs("cityname").toString), 1)
    }
    val resultRDD: RDD[((String, String), Int)] = wordAndOne.reduceByKey(_ + _)

    Utils.deleteFileByCoreSite(sc, outJsonDataPath)

    import sqlContext.implicits._
    resultRDD.toDF().repartition(4).write.json(outJsonDataPath)

    sc.stop()
  }

}
